package com.xuxueli.commontdemo.commont;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Set;
import com.xuxueli.commontdemo.entity.Topic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

/**
 * nio socket服务端
 */
@Component
public class SocketServer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    // 解码buffer
    private Charset cs = Charset.forName("UTF-8");

    // 接受数据缓冲区
    private static ByteBuffer sBuffer = ByteBuffer.allocate(1024);

    // 发送数据缓冲区
    private static ByteBuffer rBuffer = ByteBuffer.allocate(1024);

    // 选择器
    private static Selector selector;

    /**
     * 启动socket服务，开启监听
     *
     * @param port
     */
    public void startSocketServer(int port) {
        try {
            // 打开通信通道
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            // 设置为非阻塞
            serverSocketChannel.configureBlocking(false);
            // 获取套接字
            ServerSocket serverSocket = serverSocketChannel.socket();
            // 绑定端口号
            serverSocket.bind(new InetSocketAddress(port));
            // 打开监听
            selector = Selector.open();
            // 将通信信道注册到监听器
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            // 监听器一直监听，如果客户端有请求就会进入响应的时间处理
            while (true) {
                selector.select(); // select()一直阻塞直到相关事件发生或超时
                Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 监听到事件
                for (SelectionKey key : selectionKeys) {
                    handle(key);
                }
                selectionKeys.clear(); // 清除处理过的事件
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 处理不同事件
     *
     * @param selectionKey
     * @throws IOException
     */
    private void handle(SelectionKey selectionKey) throws IOException {
        ServerSocketChannel serverSocketChannel = null;
        SocketChannel socketChannel = null;
        String requestMsg = "";
        int count = 0;
        if (selectionKey.isAcceptable()) {
            // 每有客户端连接，即注册通信信道为可读
            serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
            socketChannel = serverSocketChannel.accept();
            socketChannel.configureBlocking(false);
            socketChannel.register(selector, SelectionKey.OP_READ);
            System.out.println("客户端连接成功");
        } else if (selectionKey.isReadable()) {
            socketChannel = (SocketChannel) selectionKey.channel();
            rBuffer.clear();
            count = socketChannel.read(rBuffer);
            // 读取数据
            if (count > 0) {
                rBuffer.flip();
                requestMsg = String.valueOf(cs.decode(rBuffer).array());
                String responseMsg = "客户端消息：" + requestMsg;
                System.out.println(responseMsg);
                sendKafka(requestMsg);
            }
            // 返回数据
//            sBuffer = ByteBuffer.allocate(responseMsg.getBytes().length);
//            sBuffer.put(responseMsg.getBytes());
//            sBuffer.flip();
//            socketChannel.write(sBuffer);
//            socketChannel.close();
        }
    }

    /**
     * 将客户端发送的消息写入kafka队列
     * @param msg
     * @throws JsonMappingException
     * @throws JsonProcessingException
     */
    private void sendKafka(String msg) throws JsonMappingException, JsonProcessingException {
        ObjectMapper objectMapper = new ObjectMapper();
        Topic topic = objectMapper.readValue(msg, Topic.class);
        kafkaTemplate.send(topic.getTopic(), String.valueOf(topic.getValus()));
    }
}
